diff --git a/Client/vs_lab02_grpc_log_service/pom.xml b/Client/vs_lab02_grpc_log_service/pom.xml index 2155483..3dd67fd 100644 --- a/Client/vs_lab02_grpc_log_service/pom.xml +++ b/Client/vs_lab02_grpc_log_service/pom.xml @@ -6,10 +6,10 @@ vslab2.src labor2vs - 1.0-SNAPSHOT + 1.0 - + io.grpc grpc-netty-shaded 1.63.0 @@ -79,7 +79,10 @@ false - vslab2.src.Main + vslab2.src.Main + + + META-INF/services/io.grpc.NameResolverProvider diff --git a/Client/vs_lab02_grpc_log_service/src/main/java/vslab2/src/grpcStuff/GRPCInformation.java b/Client/vs_lab02_grpc_log_service/src/main/java/vslab2/src/grpcStuff/GRPCInformation.java index 368c5bd..9d7445a 100644 --- a/Client/vs_lab02_grpc_log_service/src/main/java/vslab2/src/grpcStuff/GRPCInformation.java +++ b/Client/vs_lab02_grpc_log_service/src/main/java/vslab2/src/grpcStuff/GRPCInformation.java @@ -17,6 +17,8 @@ public class GRPCInformation { .usePlaintext() .build(); stub = LogServiceGrpc.newStub(channel); + System.out.println("-------------------------------------------------"); + System.out.println("Connection to server succesful."); } public LogServiceGrpc.LogServiceStub getStub() { diff --git a/Client/vs_lab02_grpc_log_service/src/main/java/vslab2/src/input/InputThread.java b/Client/vs_lab02_grpc_log_service/src/main/java/vslab2/src/input/InputThread.java index e0864dc..df5984d 100644 --- a/Client/vs_lab02_grpc_log_service/src/main/java/vslab2/src/input/InputThread.java +++ b/Client/vs_lab02_grpc_log_service/src/main/java/vslab2/src/input/InputThread.java @@ -31,7 +31,7 @@ public class InputThread extends Thread{ grpcInformation.initializeStub(clientInformation.getServerIP(), clientInformation.getServerPort()); while (inputThreadRunning) { - System.out.println("-----------------------------------------------"); + System.out.println("-------------------------------------------------"); System.out.println("Enter command:"); String inputString = null; try { diff --git a/Client/vs_lab02_grpc_log_service/src/main/java/vslab2/src/input/commands/CommandFactory.java b/Client/vs_lab02_grpc_log_service/src/main/java/vslab2/src/input/commands/CommandFactory.java index 9f45ec2..918952e 100644 --- a/Client/vs_lab02_grpc_log_service/src/main/java/vslab2/src/input/commands/CommandFactory.java +++ b/Client/vs_lab02_grpc_log_service/src/main/java/vslab2/src/input/commands/CommandFactory.java @@ -17,7 +17,8 @@ public class CommandFactory { switch (command) { case (Constants.ADD_LOG): { - return new CommandAddLog(commandStringParts[Constants.LOG_TEXT], clientInformation, grpcInformation); + String text = commandString.substring(Constants.GET_LOG.length() + 1, commandString.length()); + return new CommandAddLog(text, clientInformation, grpcInformation); } case (Constants.GET_LOG): { diff --git a/Client/vs_lab02_grpc_log_service/src/main/java/vslab2/src/response/observers/AddLogEmptyResponseObserver.java b/Client/vs_lab02_grpc_log_service/src/main/java/vslab2/src/response/observers/AddLogEmptyResponseObserver.java index 7ec2909..67dcc10 100644 --- a/Client/vs_lab02_grpc_log_service/src/main/java/vslab2/src/response/observers/AddLogEmptyResponseObserver.java +++ b/Client/vs_lab02_grpc_log_service/src/main/java/vslab2/src/response/observers/AddLogEmptyResponseObserver.java @@ -13,13 +13,13 @@ public class AddLogEmptyResponseObserver implements StreamObserver { @Override public void onError(Throwable t) { - System.out.println("-----------------------------------------------"); + System.out.println("-------------------------------------------------"); System.out.println("Error at AddLogEmptyResponseObserver.\n" + t.toString()); } @Override public void onCompleted() { - System.out.println("-----------------------------------------------"); + System.out.println("-------------------------------------------------"); System.out.println("AddLogEmptyResponseObserver closed."); } diff --git a/Client/vs_lab02_grpc_log_service/src/main/java/vslab2/src/response/observers/GetLogLogsResponseObserver.java b/Client/vs_lab02_grpc_log_service/src/main/java/vslab2/src/response/observers/GetLogLogsResponseObserver.java index 19b5da6..4553f02 100644 --- a/Client/vs_lab02_grpc_log_service/src/main/java/vslab2/src/response/observers/GetLogLogsResponseObserver.java +++ b/Client/vs_lab02_grpc_log_service/src/main/java/vslab2/src/response/observers/GetLogLogsResponseObserver.java @@ -8,23 +8,22 @@ public class GetLogLogsResponseObserver implements StreamObserver { @Override public void onNext(Logs value) { - System.out.println("-----------------------------------------------"); - System.out.println("Row Number\tUser Id\tText\n"); - + System.out.println("-------------------------------------------------"); + System.out.println("| Row Number\t| User Id\t| Text\t\t|"); for (Log log : value.getLogsList()) { - System.out.format("%010d\t%07d\t%s", log.getRowNumber(), log.getUserId(), log.getText()); + System.out.println("| " + log.getRowNumber() + "\t\t| " + log.getUserId() + "\t\t| " + log.getText() + "\t\t|"); } } @Override public void onError(Throwable t) { - System.out.println("-----------------------------------------------"); + System.out.println("-------------------------------------------------"); System.err.println("Error at GetLogLogsResponseObserver.\n" + t.toString()); } @Override public void onCompleted() { - System.out.println("-----------------------------------------------"); + System.out.println("-------------------------------------------------"); System.out.println("GetLogLogsResponseObserver was closed."); } diff --git a/Client/vs_lab02_grpc_log_service/src/main/java/vslab2/src/response/observers/ListenLogLogStreamResponseObserver.java b/Client/vs_lab02_grpc_log_service/src/main/java/vslab2/src/response/observers/ListenLogLogStreamResponseObserver.java index be7eb09..2769535 100644 --- a/Client/vs_lab02_grpc_log_service/src/main/java/vslab2/src/response/observers/ListenLogLogStreamResponseObserver.java +++ b/Client/vs_lab02_grpc_log_service/src/main/java/vslab2/src/response/observers/ListenLogLogStreamResponseObserver.java @@ -7,19 +7,21 @@ public class ListenLogLogStreamResponseObserver implements StreamObserver { @Override public void onNext(Log log) { - System.out.println("-----------------------------------------------"); - System.out.format("%010d\t%07d\t%s", log.getRowNumber(), log.getUserId(), log.getText()); + System.out.println("-------------------------------------------------"); + System.out.println("Received new log:"); + System.out.println("| Row number\t| User Id\t| Text\t\t|"); + System.out.println("| " + log.getRowNumber() + "\t| " + log.getUserId() + "\t| " + log.getText() + "\t\t|"); } @Override public void onError(Throwable t) { - System.out.println("-----------------------------------------------"); + System.out.println("-------------------------------------------------"); System.err.println("Error at ListenLogLogStreamResponseObserver.\n" + t.toString()); } @Override public void onCompleted() { - System.out.println("-----------------------------------------------"); + System.out.println("-------------------------------------------------"); System.out.println("ListenLogLogStreamResponseObserver closed."); } diff --git a/Client/vs_lab02_grpc_log_service/src/main/java/vslab2/src/response/observers/UnlistenLogEmptyResponseObserver.java b/Client/vs_lab02_grpc_log_service/src/main/java/vslab2/src/response/observers/UnlistenLogEmptyResponseObserver.java index fddb891..5c30e69 100644 --- a/Client/vs_lab02_grpc_log_service/src/main/java/vslab2/src/response/observers/UnlistenLogEmptyResponseObserver.java +++ b/Client/vs_lab02_grpc_log_service/src/main/java/vslab2/src/response/observers/UnlistenLogEmptyResponseObserver.java @@ -7,19 +7,19 @@ public class UnlistenLogEmptyResponseObserver implements StreamObserver{ @Override public void onNext(Empty value) { - System.out.println("-----------------------------------------------"); + System.out.println("-------------------------------------------------"); System.out.println("Unsubscribed from log stream."); } @Override public void onError(Throwable t) { - System.out.println("-----------------------------------------------"); + System.out.println("-------------------------------------------------"); System.err.println("Error at UnlistenLogEmptyResponseObserver.\n" + t.toString()); } @Override public void onCompleted() { - System.out.println("-----------------------------------------------"); + System.out.println("-------------------------------------------------"); System.out.println("UnlistenLogEmptyResponseObserver closed."); } diff --git a/Server/vs_lab02_grpc_log_service/.vscode/settings.json b/Server/vs_lab02_grpc_log_service/.vscode/settings.json new file mode 100644 index 0000000..a503fa2 --- /dev/null +++ b/Server/vs_lab02_grpc_log_service/.vscode/settings.json @@ -0,0 +1,4 @@ +{ + "java.configuration.updateBuildConfiguration": "disabled", + "java.compile.nullAnalysis.mode": "automatic" +} \ No newline at end of file diff --git a/Server/vs_lab02_grpc_log_service/pom.xml b/Server/vs_lab02_grpc_log_service/pom.xml new file mode 100644 index 0000000..b93bffc --- /dev/null +++ b/Server/vs_lab02_grpc_log_service/pom.xml @@ -0,0 +1,94 @@ + + + 4.0.0 + + vslab2.src + labor2vs + 1.0-SNAPSHOT + + + + io.grpc + grpc-netty-shaded + 1.63.0 + runtime + + + io.grpc + grpc-protobuf + 1.63.0 + + + io.grpc + grpc-stub + 1.63.0 + + + org.apache.tomcat + annotations-api + 6.0.53 + provided + + + + + 17 + 17 + + + + + + kr.motd.maven + os-maven-plugin + 1.6.2 + + + + + org.xolstice.maven.plugins + protobuf-maven-plugin + 0.6.1 + + com.google.protobuf:protoc:3.9.0:exe:${os.detected.classifier} + grpc-java + io.grpc:protoc-gen-grpc-java:1.24.0:exe:${os.detected.classifier} + + + + + compile + compile-custom + + + + + + org.apache.maven.plugins + maven-shade-plugin + 3.2.4 + + + package + + shade + + + false + + + vslab2.src.Main + + + META-INF/services/io.grpc.NameResolverProvider + + + + + + + + + \ No newline at end of file diff --git a/Server/vs_lab02_grpc_log_service/src/main/java/vslab2/src/Log/LogService.java b/Server/vs_lab02_grpc_log_service/src/main/java/vslab2/src/Log/LogService.java new file mode 100644 index 0000000..e6271de --- /dev/null +++ b/Server/vs_lab02_grpc_log_service/src/main/java/vslab2/src/Log/LogService.java @@ -0,0 +1,84 @@ +package vslab2.src.Log; + +import io.grpc.stub.StreamObserver; +import vslab2.src.User.LogUser; +import vslab2.src.User.LogUserRegistry; +import vslab2.src.grpc.LogOuterClass.Empty; +import vslab2.src.grpc.LogOuterClass.Log; +import vslab2.src.grpc.LogOuterClass.Logs; +import vslab2.src.grpc.LogOuterClass.Logs.Builder; +import vslab2.src.grpc.LogOuterClass.User; +import vslab2.src.grpc.LogServiceGrpc.LogServiceImplBase; + +public class LogService extends LogServiceImplBase { + + private ServerLogs logs = new ServerLogs(); + private LogUserRegistry users = new LogUserRegistry(); + + @Override + public StreamObserver addLog(StreamObserver responseObserver) { + return new StreamObserver() { + @Override + public void onNext(Log log) { + // Add now log to logs. + logs.addLog(new ServerLog(log.getUserId(), log.getText())); + Log logToSend = Log.newBuilder() + .setRowNumber(ServerLog.getRowNumberCounter()-1) + .setUserId(log.getUserId()) + .setText(log.getText()) + .build(); + // Send logs to clients + for (Object subscribedUser : users.getRegisteredUsers().toArray()) { + ((LogUser)subscribedUser).getStreamObserver().onNext(logToSend); + } + } + + @Override + public void onError(Throwable t) { + System.err.println("Error at LogService:addLog,\n" + t.toString()); + } + + @Override + public void onCompleted() { + // Send Empty response to client to indicate, that stream to add messages was closed. + responseObserver.onCompleted(); + } + }; + } + + @Override + public void getLog(Empty request, StreamObserver responseObserver) { + Builder logsBuilder = Logs.newBuilder(); + + // Add logs to builder + for (ServerLog log : logs.getServerLogs()) { + logsBuilder.addLogs( + Log.newBuilder() + .setRowNumber(log.getRowNumber()) + .setUserId(log.getUserID()) + .setText(log.getLogText()) + .build()); + } + // Create response message + Logs logsResponse = logsBuilder.build(); + + // Send response and close response observer + responseObserver.onNext(logsResponse); + responseObserver.onCompleted(); + } + + @Override + public void listenLog(User request, StreamObserver responseObserver) { + // Add user to UserRegistry, which administrates all users with their response observer streams + users.registerUser(new LogUser(request.getUserId(), responseObserver)); + } + + @Override + public void unlistenLog(User request, StreamObserver responseObserver) { + // Remove user from registry and close stream to client of user + StreamObserver listenedStreamObserver = users.unregisterUser(new LogUser(request.getUserId(), null)); + if (listenedStreamObserver != null) { + listenedStreamObserver.onCompleted(); + } + } +} diff --git a/Server/vs_lab02_grpc_log_service/src/main/java/vslab2/src/Log/ServerLog.java b/Server/vs_lab02_grpc_log_service/src/main/java/vslab2/src/Log/ServerLog.java new file mode 100644 index 0000000..a77930b --- /dev/null +++ b/Server/vs_lab02_grpc_log_service/src/main/java/vslab2/src/Log/ServerLog.java @@ -0,0 +1,39 @@ +package vslab2.src.Log; + +public class ServerLog { + private static int rowNumberCounter = 0; + + private int rowNumber = 0; + private String userID = null; + private String logText = null; + + public ServerLog(String userID, String logText) { + rowNumber = rowNumberCounter++; + if (userID != null) { + this.userID = userID; + } else { + this.userID = "undefined"; + } + if (logText != null) { + this.logText = logText; + } else { + this.logText = "undefined"; + } + } + + public static int getRowNumberCounter() { + return rowNumberCounter; + } + + public int getRowNumber() { + return rowNumber; + } + + public String getUserID() { + return userID; + } + + public String getLogText() { + return logText; + } +} diff --git a/Server/vs_lab02_grpc_log_service/src/main/java/vslab2/src/Log/ServerLogs.java b/Server/vs_lab02_grpc_log_service/src/main/java/vslab2/src/Log/ServerLogs.java new file mode 100644 index 0000000..c368d99 --- /dev/null +++ b/Server/vs_lab02_grpc_log_service/src/main/java/vslab2/src/Log/ServerLogs.java @@ -0,0 +1,22 @@ +package vslab2.src.Log; + +import java.util.ArrayList; +import java.util.List; + +public class ServerLogs { + private List logs = null; + + public ServerLogs() { + logs = new ArrayList(); + } + + public void addLog(ServerLog log) { + if (log != null) { + logs.add(log); + } + } + + public List getServerLogs() { + return logs; + } +} diff --git a/Server/vs_lab02_grpc_log_service/src/main/java/vslab2/src/Main.java b/Server/vs_lab02_grpc_log_service/src/main/java/vslab2/src/Main.java new file mode 100644 index 0000000..bf7a673 --- /dev/null +++ b/Server/vs_lab02_grpc_log_service/src/main/java/vslab2/src/Main.java @@ -0,0 +1,27 @@ +package vslab2.src; + +import java.io.IOException; + +import io.grpc.Server; +import io.grpc.ServerBuilder; +import vslab2.src.Log.LogService; + +public class Main { + public static void main(String[] args) { + Server server = ServerBuilder.forPort(9090).addService(new LogService()).build(); + + try { + server.start(); + } catch (IOException e) { + e.printStackTrace(); + } + + System.out.println("Server is online at port: " + server.getPort()); + + try { + server.awaitTermination(); + } catch (InterruptedException e) { + e.printStackTrace(); + } + } +} \ No newline at end of file diff --git a/Server/vs_lab02_grpc_log_service/src/main/java/vslab2/src/User/LogUser.java b/Server/vs_lab02_grpc_log_service/src/main/java/vslab2/src/User/LogUser.java new file mode 100644 index 0000000..4efa7ba --- /dev/null +++ b/Server/vs_lab02_grpc_log_service/src/main/java/vslab2/src/User/LogUser.java @@ -0,0 +1,23 @@ +package vslab2.src.User; + +import io.grpc.stub.StreamObserver; +import vslab2.src.grpc.LogOuterClass.Log; + +public class LogUser { + private String id = null; + + private StreamObserver userResponseStreamObserver = null; + + public LogUser(String id, StreamObserver observer) { + this.id = id; + this.userResponseStreamObserver = observer; + } + + public boolean equals(LogUser other) { + return this.id.equals(other.id); + } + + public StreamObserver getStreamObserver() { + return userResponseStreamObserver; + } +} diff --git a/Server/vs_lab02_grpc_log_service/src/main/java/vslab2/src/User/LogUserRegistry.java b/Server/vs_lab02_grpc_log_service/src/main/java/vslab2/src/User/LogUserRegistry.java new file mode 100644 index 0000000..5cc0515 --- /dev/null +++ b/Server/vs_lab02_grpc_log_service/src/main/java/vslab2/src/User/LogUserRegistry.java @@ -0,0 +1,39 @@ +package vslab2.src.User; + +import java.util.HashSet; +import java.util.Set; + +import io.grpc.stub.StreamObserver; +import vslab2.src.grpc.LogOuterClass.Log; + +public class LogUserRegistry { + private Set users = null; + + public LogUserRegistry() { + users = new HashSet(); + } + + public Set getRegisteredUsers() { + return users; + } + + public void registerUser(LogUser user) { + if (user != null) { + users.add(user); + } + } + + public StreamObserver unregisterUser(LogUser user) { + StreamObserver streamObserver = null; + if (user != null) { + for (Object logUser : users.toArray()) { + if (logUser.equals(user)) { + streamObserver = ((LogUser)logUser).getStreamObserver(); + break; + } + } + users.remove(user); + } + return streamObserver; + } +} diff --git a/Server/vs_lab02_grpc_log_service/src/main/proto/log.proto b/Server/vs_lab02_grpc_log_service/src/main/proto/log.proto new file mode 100644 index 0000000..3bff730 --- /dev/null +++ b/Server/vs_lab02_grpc_log_service/src/main/proto/log.proto @@ -0,0 +1,25 @@ +syntax = "proto3"; +package vslab2.src.grpc; + +message Log { + int32 row_number = 1; + string user_id = 2; + string text = 3; +} + +message Logs { + repeated Log logs = 1; +} + +message Empty {} + +message User { + string user_id = 1; +} + +service LogService { + rpc AddLog(stream Log) returns (Empty); + rpc GetLog(Empty) returns (Logs); + rpc ListenLog(User) returns (stream Log); + rpc UnlistenLog(User) returns (Empty); +} \ No newline at end of file